package com.fwmagic.dynamic_rule.engine;

import com.alibaba.fastjson.JSON;
import com.fwmagic.dynamic_rule.bean.LogBean;
import com.fwmagic.dynamic_rule.bean.ResultBean;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;

import java.util.*;

/**
 * 实时运营系统版本1.0
 * <p>
 * 规则：
 * 触发条件：E事件
 * 画像属性条件：k3=v3,k10=v80,k23=v36
 * 行为属性条件：U(p1=v3,p2=v2) >=3次 且 G(p6=v8,p4=v5,p1=v2) >=1
 * 行为次序条件：依次做过事件：W(p1=v4) -> R(p2=v3) -> F
 */
public class RuleEngineDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        Properties props = new Properties();
        props.put("bootstrap.servers", "hd1:9092,hd2:9092,hd3:9092");
        props.put("auto.offset.reset", "latest");

        //添加kafka的source
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer(
                "yinew_applog",
                new SimpleStringSchema(),
                props);

        //将数据源添加到env中
        DataStreamSource<String> logStream = env.addSource(consumer);

        //将logJson转化为LogBean
        SingleOutputStreamOperator<LogBean> mapedStream = logStream.map(new MapFunction<String, LogBean>() {
            @Override
            public LogBean map(String logJson) throws Exception {
                return JSON.parseObject(logJson, LogBean.class);
            }
        });

        //对用户进行keyby
        KeyedStream<LogBean, String> keyedStream = mapedStream.keyBy(new KeySelector<LogBean, String>() {
            @Override
            public String getKey(LogBean logBean) throws Exception {
                return logBean.getDeviceId();
            }
        });

        //处理用户数据，匹配规则
        SingleOutputStreamOperator<ResultBean> process = keyedStream.process(new KeyedProcessFunction<String, LogBean, ResultBean>() {

            private Connection connection;

            private Table table;

            //State中存储所有的历史数据
            private ListState<LogBean> listState;

            @Override
            public void open(Configuration parameters) throws Exception {
                org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
                conf.set("hbase.zookeeper.quorum", "hd1:2181,hd2:2181,hd3:2181");
                connection = ConnectionFactory.createConnection(conf);
                table = connection.getTable(TableName.valueOf("yinew_profile_new"));

                ListStateDescriptor<LogBean> stateDescriptor = new ListStateDescriptor<>("eventState", LogBean.class);
                listState = getRuntimeContext().getListState(stateDescriptor);

            }

            @Override
            public void close() throws Exception {
                connection.close();
            }

            @Override
            public void processElement(LogBean logBean, Context context, Collector<ResultBean> collector) throws Exception {
                //将所有数据存储到State中
                listState.add(logBean);

                //1.判断是否触发条件：E事件
                if ("E".equals(logBean.getEventId())) {
                    //2.判断画像属性条件：k3=v3,k10=v80,k23=v36
                    // 查询Hbase获取用户画像数据
                    Get get = new Get(logBean.getDeviceId().getBytes());
                    get.addColumn("f".getBytes(), "k3".getBytes());
                    get.addColumn("f".getBytes(), "k10".getBytes());
                    get.addColumn("f".getBytes(), "k23".getBytes());

                    Result result = table.get(get);
                    String k3Value = new String(result.getValue("f".getBytes(), "k3".getBytes()));
                    String k10Value = new String(result.getValue("f".getBytes(), "k10".getBytes()));
                    String k23Value = new String(result.getValue("f".getBytes(), "k23".getBytes()));
                    if ("v3".equals(k3Value) && "v80".equals(k10Value) && "v36".equals(k23Value)) {
                        //3.判断行为属性条件：U(p1=v3,p2=v2) >=3次 且 G(p6=v8,p4=v5,p1=v2) >=1
                        Iterable<LogBean> logBeans = listState.get();
                        int u_cnt = 0;
                        int g_cnt = 0;
                        for (LogBean bean : logBeans) {
                            //计算U事件原子条件的次数
                            if ("U".equals(bean.getEventId())) {
                                Map<String, String> properties = bean.getProperties();
                                String p1 = properties.get("p1");
                                String p2 = properties.get("p2");
                                if ("v3".equals(p1) && "v2".equals(p2)) {
                                    u_cnt++;
                                }
                            }

                            //计算G事件原子条件的次数
                            if ("G".equals(bean.getEventId())) {
                                Map<String, String> properties = bean.getProperties();
                                String p6 = properties.get("p6");
                                String p4 = properties.get("p4");
                                String p1 = properties.get("p1");
                                if ("v8".equals(p6) && "v5".equals(p4) && "v2".equals(p1)) {
                                    g_cnt++;
                                }
                            }

                            //如果行为次数条件也满足
                            if (u_cnt >= 3 && g_cnt >= 1) {
                                //4.判断行为次序条件：依次做过事件：W(p1=v4) -> R(p2=v3) -> F
                                ArrayList<LogBean> beanList = new ArrayList<>();
                                CollectionUtils.addAll(beanList, logBeans.iterator());

                                //R W R W R R R R W R F
                                int indexw = -1;
                                for (int i = 0; i < beanList.size(); i++) {
                                    LogBean beani = beanList.get(i);
                                    if ("W".equals(beani.getEventId())) {
                                        Map<String, String> properties = beani.getProperties();
                                        if ("v4".equals(properties.get("p1"))) {
                                            indexw = i;
                                            break;
                                        }
                                    }
                                }

                                int indexr = -1;
                                if (indexw > -1 && indexw + 1 < beanList.size()) {
                                    for (int i = indexw + 1; i < beanList.size(); i++) {
                                        LogBean beani = beanList.get(i);
                                        if ("R".equals(beani.getEventId())) {
                                            Map<String, String> properties = beani.getProperties();
                                            if ("v3".equals(properties.get("p2"))) {
                                                indexr = i;
                                                break;
                                            }
                                        }
                                    }
                                }

                                int indexf = -1;
                                if (indexr > -1 && indexr + 1 < beanList.size()) {
                                    for (int i = indexr + 1; i < beanList.size(); i++) {
                                        LogBean beani = beanList.get(i);
                                        if ("F".equals(beani.getEventId())) {
                                            indexf = i;
                                            break;
                                        }
                                    }
                                }
                                if (indexf > -1) {
                                    System.out.println("====>匹配到啦！！！");
                                    ResultBean resultBean = new ResultBean();
                                    resultBean.setRuleId("rule-test-1");
                                    resultBean.setDeviceId(logBean.getDeviceId());
                                    resultBean.setTimeStamp(logBean.getTimeStamp());
                                    collector.collect(resultBean);
                                }
                            }
                        }

                    }

                }
            }
        });

        process.print();


        env.execute();
    }
}
